package com.apache.spark.app

import com.alibaba.fastjson.{JSON, JSONObject}
import com.apache.spark.bean.{DauInfo, PageLog}
import com.apache.spark.utils.{MyBeanUtils, MyEsUtils, MyKafkaUtils, MyOffsetsUtils, MyRedisUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.{Jedis, Pipeline}

import java.text.SimpleDateFormat
import java.time.{LocalDate, Period}
import java.{lang, util}
import java.util.Date
import scala.collection.mutable.ListBuffer


/**
 * 日活宽表
 * 1, 准备实时环境
 * 2, 从Redis中读取偏移量
 * 3, 从Kafka中消费数据
 * 4, 提取偏移量结束点
 * 5, 处理数据
 * 5.1 转换数据结构
 * 5.2 去重
 * 5.2 维度关联
 *
 * 6, 写入ES
 * 7, 提交offsets
 */
object DwdDauApp {
  def main(args: Array[String]): Unit = {

    //0.还原状态
    revertState()

    //1.准备实时环境
    val conf: SparkConf = new SparkConf().setAppName("dwd_dau_app").setMaster("local[*]")
    val scc: StreamingContext = new StreamingContext(conf, Seconds(5))

    //2.从redis中读取offset
    val topicName: String = "DWD_PAGE_LOG_TOPIC"
    val groupId: String = "DWD_DAU_GROUP"
    val offsets: Map[TopicPartition, Long] = MyOffsetsUtils.readOffset(topicName, groupId)

    //3.从Kafka中消费数据
    var kafkaDStream: InputDStream[ConsumerRecord[String, String]] = null
    if (offsets != null && offsets.nonEmpty) {
      kafkaDStream = MyKafkaUtils.getKafkaDStream(scc, topicName, groupId, offsets)
    } else {
      kafkaDStream = MyKafkaUtils.getKafkaDStream(scc, topicName, groupId)
    }

    //4.提取offset结束点
    var offsetRanges: Array[OffsetRange] = null
    val offsetRangesDStream: DStream[ConsumerRecord[String, String]] = kafkaDStream.transform(
      rdd => {
        offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }
    )

    //5.处理数据
    //5.1 转换数据结构
    val pageLogDStream: DStream[PageLog] = offsetRangesDStream.map(
      consumerRecord => {
        val value: String = consumerRecord.value()
        val pageLog: PageLog = JSON.parseObject(value, classOf[PageLog])
        pageLog
      }
    )

    //拿到页面访问数据,打印测试
    //pageLogDStream.print(100)

    pageLogDStream.cache()
    pageLogDStream.foreachRDD(
      rdd => println("自我审查前：" + rdd.count())
    )

    //5.2 去重
    //自我审查：将页面访问数据中last_page_id不为空的数据过滤掉
    val filterDStream: DStream[PageLog] = pageLogDStream.filter(
      pageLog => pageLog.last_page_id == null
    )

    //打印
    filterDStream.foreachRDD(
      rdd => {
        println("自我审查后：" + rdd.count())
        println("----------------------------")
      }
    )

    //第三方审查：通过redis将当日活跃的mid维护起来，自我审查后的每条数据需要到redis中进行对比去重
    //redis中如何维护日活状态 (一对多场景)
    //类型: string list set zset hash -> (list set) -> (set)
    //key: DAU:DATE (日期)
    //value: mid的集合
    //写入API: lpush/rpush  sadd
    //读取API: lrange   smembers
    //过期: 24小时

    //filterDStream.filter() 每条数据执行一次，redis的连接太频繁
    // [A , B , C] => [AA , BB , CC]
    val redisFilterDStream: DStream[PageLog] = filterDStream.mapPartitions(
      pageLogIter => {
        //测试: 第三方审查之前
        val pageLogList: List[PageLog] = pageLogIter.toList
        println("第三方审查前: " + pageLogList.size)

        //存储要的数据
        val pageLogs: ListBuffer[PageLog] = ListBuffer[PageLog]()
        //时间格式化
        val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
        val jedis: Jedis = MyRedisUtils.getJedisFromPool()
        //按照mid来做去重
        for (pageLog <- pageLogList) {
          //提取每条数据中的mid(日活统计基于mid,也可以基于uid)
          val mid: String = pageLog.mid

          //获取日期，因为测试不同天的数据，不能直接获取系统时间
          val ts: Long = pageLog.ts
          val date: Date = new Date(ts)
          val dateStr: String = sdf.format(date)
          val redisDauKey: String = s"DAU:$dateStr"

          //redis的判断是否包含操作

          /*
            下面代码在分布是环境中，存在并发问题，可能多个并行度同时进入到if中，导致最终保留多条同一个mid的数据
          //list
          val mids: util.List[String] = jedis.lrange(redisDauKey, 0, -1)
          //如果 不包含mid,则添加过去,并加入ListBuffer缓存
          if (!mids.contains(mid)){
            jedis.lpush(redisDauKey,mid)
            pageLogs.append(pageLog)
          }

          /**
           * list代码是有问题的，如果如果是单点环境，没有问题，但是环境是分步式的，是有问题的，会产生同步问题
           */

          //set (操作同list)
          val setMids: util.Set[String] = jedis.smembers(redisDauKey)
          if (!setMids.contains(mid)){
            jedis.sadd(redisDauKey,mid)
            pageLogs.append(pageLog)
          }

           */

          //定义Set类型，添加的数据后的最后结果 (1:成功 , 0:失败)
          //TODO 判断包含和写入实现了原子操作
          val isNew: lang.Long = jedis.sadd(redisDauKey, mid)
          //如果放入的数据的结果是1，则加入成功
          if (isNew == 1L) {
            pageLogs.append(pageLog)
          }
        }

        //关闭redis
        jedis.close()
        //测试: 第三方审查后
        println("第三方审查后测试: " + pageLogs.size)
        pageLogs.iterator
      }
    )

    //控制台打印测试:
    //redisFilterDStream.print()

    //5.3 维度关联
    val dauInfoDStream: DStream[DauInfo] = redisFilterDStream.mapPartitions(
      pageLogIter => {
        val dauInfos: ListBuffer[DauInfo] = ListBuffer[DauInfo]()
        //时间格式化
        val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        //开启Redis连接
        val jedis: Jedis = MyRedisUtils.getJedisFromPool()

        for (pageLog <- pageLogIter) {
          //1.将pageLog中以后的字段拷贝到DauInfo中
          val dauInfo: DauInfo = new DauInfo()
          //笨办法: 将pageLog中的每个字段的值挨个提取，赋值给dauInfo
          //dauInfo.mid = pageLog.mid
          //好办法: 通过对象拷贝来完成
          MyBeanUtils.copyProperties(pageLog, dauInfo)

          //2.补充维度
          //2.1 用户信息维度
          val uid: String = pageLog.user_id
          val redisUidKey: String = s"DIM:USER_INFO:$uid"
          //Redis获取数据
          val userInfoJson: String = jedis.get(redisUidKey)
          //转换为Json格式
          val userInfoJsonObj: JSONObject = JSON.parseObject(userInfoJson)

          //提取性别
          val gender: String = userInfoJsonObj.getString("gender")

          //提取生日
          val birthday: String = userInfoJsonObj.getString("birthday") //拿到生日

          //换算年龄
          val birthdayLd: LocalDate = LocalDate.parse(birthday) //获取生日(数字)
          val nowLd: LocalDate = LocalDate.now() //获取当下数字
          val period: Period = Period.between(birthdayLd, nowLd) //取得年龄
          val age: Int = period.getYears

          //补充到对象中
          dauInfo.user_gender = gender
          dauInfo.user_age = age.toString

          //2.2 地区信息维度
          //redis中
          //现在: DIM:BASE_PROVINCE:1
          //之前: DIM:BASE_PROVINCE:110000
          val provinceID: String = dauInfo.province_id
          val redisProvinceKey: String = s"DIM:BASE_PROVINCE:$provinceID"
          //获取数据
          val provinceJson: String = jedis.get(redisProvinceKey)
          val provinceJsonObj: JSONObject = JSON.parseObject(provinceJson)
          val provinceName: String = provinceJsonObj.getString("name")
          val provinceIsoCode: String = provinceJsonObj.getString("iso_code")
          val province3166: String = provinceJsonObj.getString("iso_3166_2")
          val provinceAreaCode: String = provinceJsonObj.getString("area_code")

          //补充到对象中
          dauInfo.province_name = provinceName
          dauInfo.province_iso_code = provinceIsoCode
          dauInfo.province_3166_2 = province3166
          dauInfo.province_area_code = provinceAreaCode

          //2.3 日期字段处理
          val date: Date = new Date()
          val dtHr: String = sdf.format(date)
          val dtHrStr: Array[String] = dtHr.split(" ")
          val dt: String = dtHrStr(0)
          val hr: String = dtHrStr(1).split(":")(0)

          //补充到对象中
          dauInfo.dt = dt
          dauInfo.hr = hr

          dauInfos.append(dauInfo)
        }

        jedis.close()
        dauInfos.iterator
      }
    )

    //测试
    //dauInfoDStream.print(100)


    //写到OLAP中
    //按照天分割索引，通过索引模板控制mapping,settings,aliases等
    //准备ES工具类
    dauInfoDStream.foreachRDD(
      rdd => {
        rdd.foreachPartition(
          dauInfoIter => {
            val docs: List[(String, DauInfo)] = dauInfoIter.map(dauInfo => (dauInfo.mid, dauInfo)).toList

            if (docs.size > 0) {
              //索引名
              //如果是真实的实时环境，直接获取当前日期即可，因为此项目是模拟，会生成不同天的数据
              //从第一条数据中获取日期
              val head: (String, DauInfo) = docs.head
              val ts: Long = head._2.ts
              val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
              val dateStr: String = sdf.format(new Date(ts))

              val indexName: String = s"gmall_dau_info_$dateStr"
              //写入到ES中
              MyEsUtils.bulkSave(indexName, docs)
            }
          }
        )
        //提交offset
        MyOffsetsUtils.saveOffset(topicName, groupId, offsetRanges)
      }
    )


    scc.start()
    scc.awaitTermination()
  }

  /**
   * 状态还原
   *
   * 在每次启动实时任务时，进行一次状态还原，以ES为准，将所有的mid提取出来，覆盖到redis中
   */

  def revertState(): Unit = {
    //从ES中查询到所有的mid
    val date: LocalDate = LocalDate.now() //只能生成当前日期
    val indexName: String = s"gmall_dau_info$date"
    val fieldName: String = "mid"
    val mids: List[String] = MyEsUtils.searchField(indexName, fieldName)

    //删除redis中记录的状态(所有mid)
    val jedis: Jedis = MyRedisUtils.getJedisFromPool()
    val redisDauKey: String = s"DAU:$date"
    jedis.del(redisDauKey)

    //将从ES中查询到的mid覆盖到redis中
    if (mids != null && mids.size > 0) {
      //        for (mid <- mids){
      //          jedis.sadd(redisDauKey,mid)
      //        }
      val pipeline: Pipeline = jedis.pipelined()
      for (mid <- mids) {
        pipeline.sadd(redisDauKey, mid) //不会直接到redis执行
      }

      //到redis执行
      pipeline.sync()
    }

    //关闭redis连接
    jedis.close()

  }
}
